#ifndef XRPL_SERVER_BASEWSPEER_H_INCLUDED
#define XRPL_SERVER_BASEWSPEER_H_INCLUDED

#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/utility/instrumentation.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/crypto/csprng.h>
#include <xrpl/protocol/BuildInfo.h>
#include <xrpl/server/WSSession.h>
#include <xrpl/server/detail/BasePeer.h>
#include <xrpl/server/detail/LowestLayer.h>

#include <boost/asio/error.hpp>
#include <boost/beast/core/multi_buffer.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/logic/tribool.hpp>

#include <functional>
#include <list>

namespace ripple {

/** Represents an active WebSocket connection. */
template <class Handler, class Impl>
class BaseWSPeer : public BasePeer<Handler, Impl>, public WSSession
{
protected:
    using clock_type = std::chrono::system_clock;
    using error_code = boost::system::error_code;
    using endpoint_type = boost::asio::ip::tcp::endpoint;
    using waitable_timer = boost::asio::basic_waitable_timer<clock_type>;
    using BasePeer<Handler, Impl>::strand_;

private:
    friend class BasePeer<Handler, Impl>;

    http_request_type request_;
    boost::beast::multi_buffer rb_;
    boost::beast::multi_buffer wb_;
    std::list<std::shared_ptr<WSMsg>> wq_;
    /// The socket has been closed, or will close after the next write
    /// finishes. Do not do any more writes, and don't try to close
    /// again.
    bool do_close_ = false;
    boost::beast::websocket::close_reason cr_;
    waitable_timer timer_;
    bool close_on_timer_ = false;
    bool ping_active_ = false;
    boost::beast::websocket::ping_data payload_;
    error_code ec_;
    std::function<
        void(boost::beast::websocket::frame_type, boost::beast::string_view)>
        control_callback_;

public:
    template <class Body, class Headers>
    BaseWSPeer(
        Port const& port,
        Handler& handler,
        boost::asio::executor const& executor,
        waitable_timer timer,
        endpoint_type remote_address,
        boost::beast::http::request<Body, Headers>&& request,
        beast::Journal journal);

    void
    run() override;

    //
    // WSSession
    //

    Port const&
    port() const override
    {
        return this->port_;
    }

    http_request_type const&
    request() const override
    {
        return this->request_;
    }

    boost::asio::ip::tcp::endpoint const&
    remote_endpoint() const override
    {
        return this->remote_address_;
    }

    void
    send(std::shared_ptr<WSMsg> w) override;

    void
    close() override;

    void
    close(boost::beast::websocket::close_reason const& reason) override;

    void
    complete() override;

protected:
    Impl&
    impl()
    {
        return *static_cast<Impl*>(this);
    }

    void
    on_ws_handshake(error_code const& ec);

    void
    do_write();

    void
    on_write(error_code const& ec);

    void
    on_write_fin(error_code const& ec);

    void
    do_read();

    void
    on_read(error_code const& ec);

    void
    on_close(error_code const& ec);

    void
    start_timer();

    void
    cancel_timer();

    void
    on_ping(error_code const& ec);

    void
    on_ping_pong(
        boost::beast::websocket::frame_type kind,
        boost::beast::string_view payload);

    void
    on_timer(error_code ec);

    template <class String>
    void
    fail(error_code ec, String const& what);
};

//------------------------------------------------------------------------------

template <class Handler, class Impl>
template <class Body, class Headers>
BaseWSPeer<Handler, Impl>::BaseWSPeer(
    Port const& port,
    Handler& handler,
    boost::asio::executor const& executor,
    waitable_timer timer,
    endpoint_type remote_address,
    boost::beast::http::request<Body, Headers>&& request,
    beast::Journal journal)
    : BasePeer<Handler, Impl>(port, handler, executor, remote_address, journal)
    , request_(std::move(request))
    , timer_(std::move(timer))
    , payload_("12345678")  // ensures size is 8 bytes
{
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::run()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_, std::bind(&BaseWSPeer::run, impl().shared_from_this()));
    impl().ws_.set_option(port().pmd_options);
    // Must manage the control callback memory outside of the `control_callback`
    // function
    control_callback_ = std::bind(
        &BaseWSPeer::on_ping_pong,
        this,
        std::placeholders::_1,
        std::placeholders::_2);
    impl().ws_.control_callback(control_callback_);
    start_timer();
    close_on_timer_ = true;
    impl().ws_.set_option(
        boost::beast::websocket::stream_base::decorator([](auto& res) {
            res.set(
                boost::beast::http::field::server,
                BuildInfo::getFullVersionString());
        }));
    impl().ws_.async_accept(
        request_,
        bind_executor(
            strand_,
            std::bind(
                &BaseWSPeer::on_ws_handshake,
                impl().shared_from_this(),
                std::placeholders::_1)));
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::send(std::shared_ptr<WSMsg> w)
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(
                &BaseWSPeer::send, impl().shared_from_this(), std::move(w)));
    if (do_close_)
        return;
    if (wq_.size() > port().ws_queue_limit)
    {
        cr_.code = safe_cast<decltype(cr_.code)>(
            boost::beast::websocket::close_code::policy_error);
        cr_.reason = "Policy error: client is too slow.";
        JLOG(this->j_.info()) << cr_.reason;
        wq_.erase(std::next(wq_.begin()), wq_.end());
        close(cr_);
        return;
    }
    wq_.emplace_back(std::move(w));
    if (wq_.size() == 1)
        on_write({});
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close()
{
    close(boost::beast::websocket::close_reason{});
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::close(
    boost::beast::websocket::close_reason const& reason)
{
    if (!strand_.running_in_this_thread())
        return post(strand_, [self = impl().shared_from_this(), reason] {
            self->close(reason);
        });
    if (do_close_)
        return;
    do_close_ = true;
    if (wq_.empty())
    {
        impl().ws_.async_close(
            reason,
            bind_executor(
                strand_,
                [self = impl().shared_from_this()](
                    boost::beast::error_code const& ec) {
                    self->on_close(ec);
                }));
    }
    else
    {
        cr_ = reason;
    }
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::complete()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(&BaseWSPeer::complete, impl().shared_from_this()));
    do_read();
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ws_handshake(error_code const& ec)
{
    if (ec)
        return fail(ec, "on_ws_handshake");
    close_on_timer_ = false;
    do_read();
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_write()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
    on_write({});
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write(error_code const& ec)
{
    if (ec)
        return fail(ec, "write");
    auto& w = *wq_.front();
    auto const result = w.prepare(
        65536, std::bind(&BaseWSPeer::do_write, impl().shared_from_this()));
    if (boost::indeterminate(result.first))
        return;
    start_timer();
    if (!result.first)
        impl().ws_.async_write_some(
            static_cast<bool>(result.first),
            result.second,
            bind_executor(
                strand_,
                std::bind(
                    &BaseWSPeer::on_write,
                    impl().shared_from_this(),
                    std::placeholders::_1)));
    else
        impl().ws_.async_write_some(
            static_cast<bool>(result.first),
            result.second,
            bind_executor(
                strand_,
                std::bind(
                    &BaseWSPeer::on_write_fin,
                    impl().shared_from_this(),
                    std::placeholders::_1)));
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_write_fin(error_code const& ec)
{
    if (ec)
        return fail(ec, "write_fin");
    wq_.pop_front();
    if (do_close_)
    {
        impl().ws_.async_close(
            cr_,
            bind_executor(
                strand_,
                std::bind(
                    &BaseWSPeer::on_close,
                    impl().shared_from_this(),
                    std::placeholders::_1)));
    }
    else if (!wq_.empty())
        on_write({});
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::do_read()
{
    if (!strand_.running_in_this_thread())
        return post(
            strand_,
            std::bind(&BaseWSPeer::do_read, impl().shared_from_this()));
    impl().ws_.async_read(
        rb_,
        bind_executor(
            strand_,
            std::bind(
                &BaseWSPeer::on_read,
                impl().shared_from_this(),
                std::placeholders::_1)));
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_read(error_code const& ec)
{
    if (ec == boost::beast::websocket::error::closed)
        return on_close({});
    if (ec)
        return fail(ec, "read");
    auto const& data = rb_.data();
    std::vector<boost::asio::const_buffer> b;
    b.reserve(std::distance(data.begin(), data.end()));
    std::copy(data.begin(), data.end(), std::back_inserter(b));
    this->handler_.onWSMessage(impl().shared_from_this(), b);
    rb_.consume(rb_.size());
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_close(error_code const& ec)
{
    cancel_timer();
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::start_timer()
{
    // Max seconds without completing a message
    static constexpr std::chrono::seconds timeout{30};
    static constexpr std::chrono::seconds timeoutLocal{3};

    try
    {
        timer_.expires_after(
            remote_endpoint().address().is_loopback() ? timeoutLocal : timeout);
    }
    catch (boost::system::system_error const& e)
    {
        return fail(e.code(), "start_timer");
    }

    timer_.async_wait(bind_executor(
        strand_,
        std::bind(
            &BaseWSPeer<Handler, Impl>::on_timer,
            impl().shared_from_this(),
            std::placeholders::_1)));
}

// Convenience for discarding the error code
template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::cancel_timer()
{
    try
    {
        timer_.cancel();
    }
    catch (boost::system::system_error const&)
    {
        // ignored
    }
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping(error_code const& ec)
{
    if (ec == boost::asio::error::operation_aborted)
        return;
    ping_active_ = false;
    if (!ec)
        return;
    fail(ec, "on_ping");
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_ping_pong(
    boost::beast::websocket::frame_type kind,
    boost::beast::string_view payload)
{
    if (kind == boost::beast::websocket::frame_type::pong)
    {
        boost::beast::string_view p(payload_.begin());
        if (payload == p)
        {
            close_on_timer_ = false;
            JLOG(this->j_.trace()) << "got matching pong";
        }
        else
        {
            JLOG(this->j_.trace()) << "got pong";
        }
    }
}

template <class Handler, class Impl>
void
BaseWSPeer<Handler, Impl>::on_timer(error_code ec)
{
    if (ec == boost::asio::error::operation_aborted)
        return;
    if (!ec)
    {
        if (!close_on_timer_ || !ping_active_)
        {
            start_timer();
            close_on_timer_ = true;
            ping_active_ = true;
            // cryptographic is probably overkill..
            beast::rngfill(payload_.begin(), payload_.size(), crypto_prng());
            impl().ws_.async_ping(
                payload_,
                bind_executor(
                    strand_,
                    std::bind(
                        &BaseWSPeer::on_ping,
                        impl().shared_from_this(),
                        std::placeholders::_1)));
            JLOG(this->j_.trace()) << "sent ping";
            return;
        }
        ec = boost::system::errc::make_error_code(
            boost::system::errc::timed_out);
    }
    fail(ec, "timer");
}

template <class Handler, class Impl>
template <class String>
void
BaseWSPeer<Handler, Impl>::fail(error_code ec, String const& what)
{
    XRPL_ASSERT(
        strand_.running_in_this_thread(),
        "ripple::BaseWSPeer::fail : strand in this thread");

    cancel_timer();
    if (!ec_ && ec != boost::asio::error::operation_aborted)
    {
        ec_ = ec;
        JLOG(this->j_.trace()) << what << ": " << ec.message();
        ripple::get_lowest_layer(impl().ws_).socket().close(ec);
    }
}

}  // namespace ripple

#endif
